If you are asked to write a highly scalable Java-based server, it
won't take long to decide to use the Java NIO package. To get your
server running, you will probably spend a lot of time reading blogs
and tutorials to understand the thread synchronization needs of the
NIO Selector class and to deal with common pitfalls.
This article describes the basic architecture of a
connection-oriented NIO-based server. It takes a look at a preferred
threading model and discusses the basic components of such a
server.
Threading Architecture
The first and most intuitive way to implement a multi-threaded
server is to follow the thread-per-connection approach. This
is the traditional pre-Java-1.4 solution, caused by the lack of
non-blocking I/O support in older Java versions. The
thread-per-connection approach uses an exclusive worker thread for
each connection. Within the handling loop, a worker thread waits for
new incoming data, processes the request, returns the response data,
and calls the blocking socket's read method again.
public class Server {
private ExecutorService executors = Executors.newFixedThreadPool(10);
private boolean isRunning = true;
public static void main(String... args) throws ... {
new Server().launch(Integer.parseInt(args[0]));
}
public void launch(int port) throws ... {
ServerSocket sso = new ServerSocket(port);
while (isRunning) {
Socket s = sso.accept();
executors.execute(new Worker(s));
}
}
private class Worker implements Runnable {
private LineNumberReader in = null;
...
Worker(Socket s) throws ... {
in = new LineNumberReader(new InputStreamReader(...));
out = ...
}
public void run() {
while (isRunning) {
try {
// blocking read of a request (line)
String request = in.readLine();
// processing the request
...
String response = ...
// return the response
out.write(resonse);
out.flush();
} catch (Exception e ) {
...
}
}
in.close();
...
}
}
}
There is always a one-to-one relationship between simultaneous
client connections and the number of concurrent worker threads.
Because each connection has an associated thread waiting on the
server side, very good response times can be achieved. However,
higher loads require a higher number of running, concurrent threads,
which limits scalability. In particular, long-living connections like
persistent HTTP connections lead to a lot of concurrent worker
threads, which tend to waste their time waiting for new client
requests. In addition, hundreds or even thousands of concurrent
threads can waste a great deal of stack space. Note, for example,
that the default
Java thread stack size for Solaris/Sparc is 512 KB.
If the server has to handle a high number of simultaneous clients
and tolerate slow, unresponsive clients, an alternative threading
architecture is needed. The thread-on-event approach
implements such requirements in a very efficient way. The worker
threads are independent from the connections and will only be used to
handle specific events. For instance, if a data
received event occurs, a worker thread will be used to process
the application-specific encoding and service tasks (or at least to
start them). Once this job is complete, the worker will be returned
to the thread pool. This approach requires performing the socket I/O
operations in a non-blocking manner. The socket's read
or write method calls have to be non-blocking.
Additionally, an event system is required; it signals if new data is
available, which in turn initiates the socket read call.
This removes the one-to-one relationship between waiting reads and
taken threads. The design of such an event-driven I/O system is
described by the Reactor pattern.
The Reactor Pattern
The Reactor pattern, illustrated in
Figure 1, separates the detection of events like readiness for
read or readiness for accepting and the processing of
these events. If a readiness event occurs, an event handler will be
notified to perform the appropriate processing within dedicated
worker threads.
Figure 1. A NIO-based Reactor pattern implementation
To participate in the event architecture, the connection's
Channel has to be registered on a Selector.
This will be done by calling the register method.
Although this method is part of the SocketChannel, the
channel will be registered on the Selector, not the
other way around.
...
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
// register the connection
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ);
...
To detect new events, the Selector provides the
capability to ask the registered channels for their readiness events.
By calling the select method, the Selector
collects the readiness events of the registered channels. This method
call blocks until at least one event has been occurred. In this case,
the method returns the number of connections that have become ready
for I/O operations since the last select call. The
selected connections can be retrieved by calling the Selector's
selectedKey method. This method returns a set of
SelectionKey objects, which holds the IO event status
and the reference of the connection's Channel.
A Selector is held by the Dispatcher.
This is a single-threaded active class that surrounds the
Selector. The Dispatcher is responsible to
retrieve the events and to dispatch the handling of the consumed
events to the EventHandler. Within the dispatch loop,
the Dispatcher calls the Selector's
select method to wait for new events. If at least one
event has been occurred, the method call returns and the associated
channel for each event can be acquired by calling the
selectedKeys method.
...
while (isRunning) {
// blocking call, to wait for new readiness events
int eventCount = selector.select();
// get the events
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// readable event?
if (key.isValid() && key.isReadable()) {
eventHandler.onReadableEvent(key.channel());
}
// writable event?
if (key.isValid() && key.isWritable()) {
key.interestOps(SelectionKey.OP_READ); // reset to read only
eventHandler.onWriteableEvent(key.channel());
}
...
}
...
}
Based on an event like readiness for read or readiness
for write, the EventHandler will be called by the
Dispatcher to process the event. The
EventHandler decodes the request data, processes the
required service activities, and encodes the response data. Because
worker threads are not forced to waste time by waiting for new
requests to open a connection, the scalability and throughput of this
approach is conceptually only limited by system resources like CPU or
memory. That said, the response times wouldn't be as good as for the
thread-per-connection approach, because of the required thread
switches and synchronization. The challenge of the event-driven
approach is therefore to minimize synchronizations and optimize
thread management, so that this overhead will be negligible.
Component Architecture
Most highly scalable Java servers are built on the top of the
Reactor pattern. By doing this, the classes of the Reactor
pattern will be enhanced by additional classes for connection
management, buffer management, and for load balancing reasons. The
entry class of such a server is the Acceptor. This
arrangement is shown in Figure 2.
Figure 2. Major components of a connection-oriented server
Acceptor
Every new client connection of a server will be accepted by the
single Acceptor, which is bound to the server port. The
Acceptor is a single threaded active class. Because it
is only responsible for handling the very short-running client
connection request, it is often sufficient to implement the
Acceptor using the blocking I/O model. The
Acceptor gets the handle of a new connection by calling
the ServerSocketChannel's blocking accept
method. The new connection will be registered to a
Dispatcher. After this, the connection participates in
event handling.
Because the scalability of a single Dispatcher is
limited, often a small pool of Dispatchers will be used.
One reason for this limitation is the operating-system-specific
implementation of the Selector. Most popular operating
systems map a SocketChannel to a file handle in a
one-to-one relationship. Depending on the concrete system, the
maximum number of file handles per Selector is limited
in a different way.
class Acceptor implements Runnable {
...
void init() {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(true);
serverChannel.socket().bind(new InetSocketAddress(serverPort));
}
public void run() {
while (isRunning) {
try {
SocketChannel channel = serverChannel.accept();
Connection con = new Connection(channel, appHandler);
dispatcherPool.nextDispatcher().register(con);
} catch (...) {
...
}
}
}
}
In the example code, a Connection object holds the
SocketChannel and an application-level event handler.
These classes will be described below.
Dispatcher
By calling the Dispatcher's register
method, the SocketChannel will be registered on the
underlying Selector. Here is where the trouble comes in.
The Selector manages the registered channels internally
by using key sets. This means that by registering a channel,
an associated SelectionKey will be created and be added
to the Selector's registered key set. At the same time, the
concurrent dispatcher thread could call the Selector's
select method, which also accesses the key set. Because
the key sets are not thread-safe, an unsynchronized registration in
the context of the Acceptor thread can lead to deadlocks
and race conditions. This can be solved by implementing the
selector guard object idiom, which allows suspending the
dispatcher thread temporarily. See "How
to Build a Scalable Multiplexed Server with NIO" (PDF) for an
explanation of this approach.
class Dispatcher implements Runnable {
private Object guard = new Object();
…
void register(Connection con) {
// retrieve the guard lock and wake up the dispatcher thread
// to register the connection's channel
synchronized (guard) {
selector.wakeup();
con.getChannel().register(selector, SelectionKey.OP_READ, con);
}
// notify the application EventHandler about the new connection
…
}
void announceWriteNeed(Connection con) {
SelectionKey key = con.getChannel().keyFor(selector);
synchronized (guard) {
selector.wakeup();
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
public void run() {
while (isRunning) {
synchronized (guard) {
// suspend the dispatcher thead if guard is locked
}
int eventCount = selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// read event?
if (key.isValid() && key.isReadable()) {
Connection con = (Connection) key.attachment();
disptacherEventHandler.onReadableEvent(con);
}
// write event?
…
}
}
}
}
After a connection has been registered, the Selector
listens for readiness events of this connection. If a event occurs,
the appropriated callback method of the Dispatcher's
event handler will be called by passing the associated connection.
Dispatcher-Level
EventHandler
The first activity performed while processing a readiness for
read event is to call the channel's read method. In
contrast to the streaming interface, the Channel
interface requires that a read buffer has to be passed over. Often
direct-allocated ByteBuffers will be used. Direct
buffers reside in native memory, bypassing the Java heap space. By
using direct buffers, socket IO operations will be performed without
the need to create internal intermediate buffers.
Normally the read call will be performed very
quickly. Depending on the operating system, the socket read operation
often only puts a copy of the received data from the kernel memory
space into the read buffer, which resides in the user-controlled
memory space.
The received data will be appended to the connection's
thread-safe read queue for further processing. Based on the
result of the I/O operation, application-specific tasks have to be
processed. Such tasks will be processed by the assigned
application-level event handler. This handler will typically called
by using a worker thread.
class DispatcherEventHandler {
...
void onReadableEvent(final Connection con) {
// get the received data
ByteBuffer readBuffer = allocateMemory();
con.getChannel().read(readBuffer);
ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);
// append it to read queue
con.getReadQueue().add(data);
...
// perform further operations (encode, process, decode)
// by a worker thread
if (con.getReadQueue().getSize() > 0) {
workerPool.execute(new Runnable() {
public void run() {
synchronized (con) {
con.getAppHandler().onData(con);
}
}
});
}
}
void onWriteableEvent(Connection con) {
ByteBuffer[] data = con.getWriteQueue().drain();
con.getChannel().write(data); // write the data
...
if (con.getWriteQueue().isEmpty()) {
if (con.isClosed()) {
dispatcher.deregister(con);
}
} else {
// there is remaining data to write
dispatcher.announceWriteNeed(con);
}
}
}
Within the application-specific tasks, data will be encoded,
services will be performed, and data will be written. By writing
data, the data to send will be appended to the write queue, and the
Dispatcher's announceWriteNeed method will
be called. This method causes the Selector to listen for
readiness for write events. If such an event occurs, the
Dispatcher-level event handler's method
onWriteableEvent will be performed. It gets the data
from the connection's write queue and performs the required write I/O
operation. Trying to write data in a direct way, by bypassing this
event approach, will end in deadlocks and race conditions.
Application-Level
EventHandler
In contrast to the Dispatcher's event handler, the
application-specific event handler listens for higher-level
connection-oriented events, like connection established,
data received, or connection disconnected. The concrete
event handler design is one of the major differences between NIO
server frameworks like SEDA, MINA, or emberIO.
Such frameworks often implement a multi-staged architecture, where
chains of event handlers can be used. This allows adding handlers
like SSLHandler or DelayedWriteHandler,
which intercept the request/response processing. The following
example shows an application-level handler based on the xSocket framework. The
xSocket framework supports different handler interfaces that define
callback methods to be implemented by application-specific code.
class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... {
private static final String DELIMITER = ...
private Mailbox mailbox = ...
public static void main(String... args) throws ... {
new MultithreadedServer(110, new POP3ProtocolHandler()).run();
}
public boolean onConnect(INonBlockingConnection con) throws ... {
if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) {
con.setWriteTransferRate(5); // reduce transfer: 5byte/sec
}
con.write("+OK My POP3-Server" + DELIMITER);
return true;
}
public boolean onData(INonBlockingConnection con) throws ... {
String request = con.readStringByDelimiter(DELIMITER);
if (request.startsWith("QUIT")) {
mailbox.close();
con.write("+OK POP3 server signing off" + DELIMITER);
con.close();
} else if (request.startsWith("USER")) {
this.user = request.substring(4, request.length());
con.write("+OK enter password" + DELIMITER);
} else if (request.startsWith("PASS")) {
String pwd = request.substring(4, request.length());
boolean isAuthenticated = authenticator.check(user, pwd);
if (isAuthenticated) {
mailbox = MailBox.openAndLock(user);
con.write("+OK mailbox locked and ready" + DELIMITER);
} else {
...
}
} else if (...) {
...
}
return true;
}
}
To ease in accessing the underlying read and write queue, the
Connection object provides several convenience
read and write methods for stream- and
channel-oriented operations.
By closing the connection, the underlying implementation initiates
a writeable event round-trip to flush the write queue. The connection
will be terminated after the remaining data has been written. Besides
such a controlled termination, connections can be disconnected for
other reasons. For instance, hardware malfunctions could cause the
termination of a TCP-based connection. Such a situation can only be
detected by performing read or write operations on the socket, or by
idle timeouts. Most NIO frameworks provide a built-in function to
handle such uncontrolled terminations.
Conclusion
An event-driven non-blocking architecture is a fundamental layer
to implement highly efficient, scalable, and reliable servers. The
challenge is to minimize the thread synchronization overhead and to
optimize the connection/buffer management. This will be the hardest
part to program.
But there is no need to reinvent the wheel. Server frameworks like
xSocket, emberIO, SEDA, or MINA abstract the low-level event handling
and thread management to ease the creation of highly scalable
servers. Most of these server frameworks also support features like
SSL or UDP, which haven't been discussed in this article.
Resources
"Scalable IO
in Java" (PDF) describes event-driven processing by using
Java NIO
Unix
Network Programming: The Sockets Networking API gives a good
overview about network programming in general, and gives a good
impression what happens behind the Java I/O operations on the
operating-system level.
xSocket
is a LGPL NIO-based library to build network applications. Most
example code of this article has been written based on
xSocket.
Gregor Roth
works as a software architect at United Internet group, a leading
European internet service provider.
Cool short post... bookmarked for the next time I'm writing a
server.
For thread safety on multi-cpu machines you should make the
boolean state variable isRunning volatile and the
ExecutorService and guard should be final. Otherwise
different threads on different cores may see stale values.
It may also be more efficient to use a Lock instead of
synchronized... especially since the above changes would fix any
variable visibility problems.
Also, is it really safe to simply synchronize on con in
onReadableEvent, given that there are other operations on con
elsewhere in the code that will not be synchronized. Maybe it
is... I'd have to sit down and understand the flow in more
detail.
Found the link in response to our post on the
Java Blog
First of all, it is a good style to set attributes as final
if they haven been designed as immutable. I should have done
this in my examples for attributes like guard or executors.
Because the executors or the guard object never changes,
there wouldn’t be situations where threads will see
different values. That means the implementation will work as
predicted. But the variable isRunning should haven been
declared as volatile as you mentioned.
I didn’t see the advantage to use a
java.util.concurrent.locks.Lock object instead of
synchronized in my examples. My intention was to keep the
examples simple and correct. Using Lock objects depends on
the concrete implementation. For example if you have to
implement algorithms like hand-over-hand locking you will use
Lock objects.
The idea of synchronizing on con is to avoid concurrent
call backs calls (performed by the worker threads) for
the same connection instance. That means other call back
methods like onConnect or onIdleTimeout have to be
synchronized in the same way. By doing this, the call back
methods will be performed in a serialized manner.
Synchronizing connection’s methods like write(…),
readInt() would be another topic.
Have you looked at Grizzly, https://grizzly.dev.java.net ? It
just recently went open source. It's very performant and highly
scalable.
Mina is very good
2007-02-13 08:59:45 claudio [Reply | View]
I have used Apache MINA last year on a project, to monitor a
range of servers. What I have to say ? MINA is very well designed
and the better, good performance.